SnowflakeのQuickstart「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみた

SnowflakeのQuickstart「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみた

Clock Icon2024.05.10

さがらです。

先月、Amazon Data Firehoseを用いてSnowflakeに対してデータをストリーミングする機能が一般提供となりました。東京リージョンでも使えます!

これを受けて、関連するQuickstartである「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみたので、本記事でまとめてみます。

Quickstartの内容

このQuickstartですが、OpenSky Networkというリアルタイムの航空データを提供しているサービスのAPIを用いて、JSON形式で航空状況を表す時系列データを取得し、Data Firehose経由でSnowflakeにストリーミングする、ということを実際に体験できる内容となっています。

下図はQuickstartのページからの引用ですが、このようなアーキテクチャとなっています。より具体的には、EC2インスタンス上でOpenSkyのAPIを叩いてデータを取得してData FirehoseにストリーミングするPython処理を実行し、Snowflakeでストリーミングされたデータをクエリする、という構成です。

前提

本Quickstartを進める上で、以下の前提で進めます。

  • ストリームデータを生成するEC2は「パブリックサブネット」に配置
    • このため、Quickstart上で言及されているPrivateLink周りの処理は行いません
  • VPC、Subnet、Security Group、EC2、S3など、Data Firehose以外のリソースについてはCloudFormationで定義(詳細は後述)
  • AWSは「アジアパシフィック(東京)」リージョン
  • Snowflakeのアカウントは、クラウドプラットフォームがAWS、リージョンは東京、Enterpriseエディション、で事前に作成済

AWS上の準備(2. Provision a Linux jumphost in AWS)

CloudFormationの実行

本Quickstart上でもEC2を立てるCloudFormationを実行するリンクがあるのですが、そのリンク先に記載されているjsonファイル(https://snowflake-corp-se-workshop.s3.us-west-1.amazonaws.com/VHOL_Snowflake_KDF/kdf-bastion.json)を変換したコードを使って各AWSリソースを構築します。

具体的には、以下の内容でClaude3 Sonnetに依頼して変換してもらいました。(個人的に、CloudFormationのコードの生成ではClaude3 Sonnetをよく使っています。)

※上述のJSONのコードを貼り付けた上で
このコードについて、以下の内容に沿って変換してください。
・YAMLにすること
・VPC、Public Subnet、SecurityGroupなど必要なリソースもこのコード上で定義すること。
・Security Groupでは、特定のIPからのアクセスに限定させること
・Firehoseのbackup settings用のS3バケットの作成も追加すること
・名前をつけることが出来るリソースについて、全て名前の指定を行うこと

この依頼内容で変換してもらったコードがこちらです。(parameterのところなど、手作業で少しだけ編集しているところもあります。)

AWSTemplateFormatVersion: '2010-09-09'
Description: Create a Jumphost to run Kinesis producer for ingesting data into firehose
Parameters:
  InstanceType:
    Description: JumpHost EC2 instance types
    Type: String
    Default: t3.micro
    AllowedValues:
      - t2.micro
      - t2.small
      - t2.medium
      - t3.micro
      - t3.small
      - t3.medium
  LatestAmiId:
    Type: 'AWS::SSM::Parameter::Value<AWS::EC2::Image::Id>'
    Description: JumpHost EC2 AMI Id
    Default: "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2"
    AllowedValues:
      - "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2"
  RemoteAccessCIDR:
    Description: CIDR IP range that can access the Jumphost
    Type: String
    Default: 0.0.0.0/0
    AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})'
    ConstraintDescription: must be a valid CIDR range of the form x.x.x.x/x
Resources:
  VPC:
    Type: 'AWS::EC2::VPC'
    Properties:
      CidrBlock: 10.0.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      InstanceTenancy: default
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-VPC'
  PublicSubnet:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      AvailabilityZone: !Select [0, !GetAZs '']
      CidrBlock: 10.0.0.0/24
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-Public-Subnet'
  InternetGateway:
    Type: 'AWS::EC2::InternetGateway'
    Properties:
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-IGW'
  VPCGatewayAttachment:
    Type: 'AWS::EC2::VPCGatewayAttachment'
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway
  RouteTable:
    Type: 'AWS::EC2::RouteTable'
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-RouteTable'
  RouteTableInternetGatewayRoute:
    Type: 'AWS::EC2::Route'
    DependsOn: VPCGatewayAttachment
    Properties:
      RouteTableId: !Ref RouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway
  RouteTableAssociation:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PublicSubnet
      RouteTableId: !Ref RouteTable
  FirehoseBackupBucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Sub '${AWS::StackName}-backup-bucket'
      AccessControl: BucketOwnerFullControl
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
  JumpHostSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: Allow SSH from specified CIDR range
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: !Ref RemoteAccessCIDR
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-JumpHost-SG'
  rJumpHostInstanceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - ec2.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM'
      RoleName: !Sub '${AWS::StackName}-JumpHostRole'
      Policies:
        - PolicyName: !Sub '${AWS::StackName}-${AWS::AccountId}-jumphostpolicy'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 's3:*'
                  - 'kinesis:*'
                  - 'firehose:*'
                Resource: '*'
  rJumpHostInstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Path: /
      Roles:
        - !Ref rJumpHostInstanceRole
      InstanceProfileName: !Sub '${AWS::StackName}-JumpHostInstanceProfile'
  rJumpHost:
    Type: 'AWS::EC2::Instance'
    Metadata:
      'AWS::CloudFormation::Init': {}
    Properties:
      IamInstanceProfile: !Ref rJumpHostInstanceProfile
      ImageId: !Ref LatestAmiId
      InstanceType: !Ref InstanceType
      UserData:
        'Fn::Base64':
          !Sub |
            #!/bin/bash
            yum -y install jq python3-pip
            echo ${AWS::Region} > /tmp/region
            echo ${AWS::StackName} > /tmp/stackName
            curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "/tmp/awscliv2.zip"
            unzip "/tmp/awscliv2.zip" -d /tmp
            /tmp/aws/install
            chmod 755 /usr/local/bin/aws
            aws s3 cp s3://snowflake-corp-se-workshop/VHOL_Snowflake_KDF/adf-producer.py /tmp/adf-producer.py
            pip3 install geohash2 pytz boto3 requests urllib3==1.26.6
            # Helper function
            function error_exit
            {
              /opt/aws/bin/cfn-signal -e 1 -r "$1" '${EC2WaitHandle}'
              exit 1
            }
            # Install the basic system configuration
            /opt/aws/bin/cfn-init -s ${AWS::StackId} -r rJumpHost --region ${AWS::Region} || error_exit 'Failed to run cfn-init'
            # All done so signal success
            /opt/aws/bin/cfn-signal -e 0 -r "setup complete" '${EC2WaitHandle}'
      NetworkInterfaces:
        - AssociatePublicIpAddress: true
          DeviceIndex: 0
          GroupSet:
            - !Ref JumpHostSecurityGroup
          SubnetId: !Ref PublicSubnet
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-jumphost'
  EC2WaitHandle:
    Type: 'AWS::CloudFormation::WaitConditionHandle'
  EC2WaitCondition:
    Type: 'AWS::CloudFormation::WaitCondition'
    Properties:
      Handle: !Ref EC2WaitHandle
      Timeout: 600
Outputs:
 JumpHostPublicIP:
   Description: Public IP of the Jumphost
   Value: !GetAtt 'rJumpHost.PublicIp'
 InstanceSecurityGroupID:
   Description: The ID of the security group created for the Jumphost
   Value: !Ref JumpHostSecurityGroup
 JumpHostId:
   Description: Jump Host Instance ID
   Value: !Ref rJumpHost
 FirehoseBackupBucketName:
   Description: Bucket name for Firehose backup
   Value: !Ref FirehoseBackupBucket

このコードをファイル化して、CloudFormationを実行します。実行する際はRemoteAccessCIDRだけご注意ください。今回の検証範囲ではEC2からData Firehoseを介してSnowflakeへインターネット上で通信ができればよいので、EC2へアクセスするIP制限を設ける意味合いで私は自宅のグローバルIPを記入しました。

EC2をAWS Systems Manager Session Managaerから操作

続いて、EC2をAWS Systems Manager Session Managaerから操作してFirehoseからSnowflakeの認証に使うキーペアを生成します。

まず、Quickstartの内容に沿って、AWS Systems Manager Session Managaerで設定からLinux shell profile/bin/bashに変更して保存を押します。

次に、セッションからセッションの開始を押し、先程CloudFormationで立てたインスタンスへログインします。

Quickstartに沿って、キーペアの作成から出力を行います。生成の際に用いたPasswordと、catで確認した公開鍵と秘密鍵は後で使用しますので、すぐにコピペできるようにしておきましょう。

cd $HOME
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
cat pub.Key

grep -v KEY rsa_key.p8 | tr -d '\n' | awk '{print $1}' > priv.Key
cat priv.Key

Snowflake上の準備(3. Prepare the Snowflake cluster for streaming)

続いて、Snowflake上で準備を進めます。

まず、使用するユーザー、データベース、ウェアハウス、ロールを準備します。一番最後のWITHから始まるSELECT文で、Data Firehoseの設定で使用するAccount Identifierが確認できるので、こちらの値もすぐにコピペできるようにしておきましょう。

-- Set default value for multiple variables
-- For purpose of this workshop, it is recommended to use these defaults during the exercise to avoid errors
-- You should change them after the workshop
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'ADF_STREAMING_DB';
SET WH = 'ADF_STREAMING_WH';
SET ROLE = 'ADF_STREAMING_RL';

USE ROLE ACCOUNTADMIN;

-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER';

-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);

-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';

-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);

-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;

-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

次に、作成したユーザーに先ほどEC2上で確認した公開鍵を割り当てます。

use role accountadmin;
alter user streaming_user set rsa_public_key='< pubKey >';

作成したユーザーでログインしてスキーマとテーブルの作成を行う

次に、先程作成したユーザーでSnowflakeアカウントにログインしなおし、スキーマとテーブルの作成を行います。(Quickstart上ではテーブル作成はもう少し後で行っていますが、このタイミングでまとめて行ったほうが画面の行き来が少なくて済むのでこのタイミングで行っています。)

SET DB = 'ADF_STREAMING_DB';
SET SCHEMA = 'ADF_STREAMING_SCHEMA';

USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);

use ADF_STREAMING_DB;
use schema ADF_STREAMING_SCHEMA;
create or replace TABLE ADF_STREAMING_TBL (
	ORIG VARCHAR(20),
	UTC NUMBER(38,0),
	ALT VARCHAR(20),
	ICAO VARCHAR(20),
	LON VARCHAR(20),
	ID VARCHAR(20),
	DEST VARCHAR(20),
	LAT VARCHAR(20)
);

Amazon Data Firehoseの設定(4. Create an ADF delivery stream)

次に、Data Firehoseの設定に移ります。

Amazon Data Firehoseのページから、Firehoseストリームを作成を押します。

ここから、各設定値を入力していきます。

ソースDirect PUT送信先Snowflakeを選択します。

Firehose ストリーム名は任意の名称を入れましょう。ここで決めた名称は、後でEC2からPython処理を実行する際に引数として使うことになります。

次は送信先の設定となりますが、以下の内容に沿って入力してください。

  • SnowflakeアカウントURLは、形式に記載されたURLのうち、[account identifier]を先程Snowflakeの画面で確認したものに変換して入力
  • ユーザーは、STREAMING_USERと入力
  • プライベートキーは、EC2上で作成したキーペアの秘密鍵の値を入力
  • パスフレーズは、EC2上でキーペア作成時に用いたパスワードを入力
  • ロールは、カスタムのSnowflakeロールを使用するを選択し、ADF_STREAMING_RLと入力
  • Snowflakeデータベースは、ADF_STREAMING_DBと入力
  • Snowflakeスキーマは、ADF_STREAMING_SCHEMAと入力
  • Snowflakeテーブルは、ADF_STREAMING_TBLと入力

バックアップの設定では、S3バックアップバケットにCloudFormationで構成されたS3バケットを選択します。

あとは右下のFirehoseストリームを作成を押せばOKです。

データの取得とクエリ実行(5. Ingest and Query data in Snowflake)

最後に、実際にEC2上でデータをストリーミングするPython処理を実行し、Data Firehose経由でSnowflakeにロードされているかを確認してみます。

EC2にログインし、ストリームデータを生成するPythonを実行

まず、EC2にログインして、ストリームデータを生成するPythonを実行します。

実際に実行するPythonコードですが、https://snowflake-corp-se-workshop.s3.us-west-1.amazonaws.com/VHOL_Snowflake_KDF/adf-producer.pyのリンクからダウンロード出来ます。

コードの内容としては下記のようになっています。

import pytz,boto3,time,requests,sys,re,json
from datetime import datetime
from botocore.config import Config

global tbl,maxNumRecords
maxNumRecords=250

def dumpToKDF(rec):
 for j in range(0,len(rec),maxNumRecords):
    records=rec[j:j+maxNumRecords]
    numRec=str(len(records))
    try:
      for item in records:
        print(json.dumps(item))
        result = kdfClient.put_record(DeliveryStreamName=deliveryStreamName,Record={'Data': json.dumps(item)})
    except Exception as err:
            print("Error:", err)

#Main
if len(sys.argv) != 2:
   print("Usage: python3 ",sys.argv[0], " <delivery stream name>")
   sys.exit()

deliveryStreamName=sys.argv[1]
headers = {'Content-type': 'application/json'}
url = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
region = requests.get(url).text[:-1]

kdf = boto3.Session()

kdfClient = kdf.client('firehose', region_name=region, config=Config(read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10}))

url='http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky'

kdfflight=[]

while True:
  try:
    results = requests.get(url, headers=headers)
    r=json.loads(results.text)
    print('Headers:')
  except:
    print(results.headers)
    pass

  ts=int(time.time())

  print(datetime.now(pytz.timezone('America/Los_Angeles')).strftime("%D %H:%M:%S"))

  for st in r:
       (icao,id,utc,lon,lat,alt,dest,orig)=(st['icao'],st['id'],st['utc'],st['lon'],st['lat'],st['alt'],st['dest'],st['orig'])
       if dest=='KSFO' or dest=='KSJC' or dest=='KOAK' or dest=='KPAO' or dest=='KSQL' or dest=='KRHV' or dest=='KNUQ' or dest=='KHWD' or dest=='KLVK' or dest=='KHAF':
         kdfRecord={}
         kdfRecord['utc']=str(utc)
         kdfRecord['icao']=icao
         kdfRecord['id']=id
         kdfRecord['lat']=str(lat)
         kdfRecord['lon']=str(lon)
         kdfRecord['alt']=str(int(alt))
         kdfRecord['dest']=dest
         kdfRecord['orig']=orig
         kdfflight.append(kdfRecord)

  dumpToKDF(kdfflight)
  kdfflight=[]

  print('-----------')
  print('Sleeping for 20 seconds')
  print('-----------')
  time.sleep(20)

では、キーペアを生成したときと同じくAWS Systems Manager Session Managaerから対象のEC2にログインし、以下のコマンドを実行します。

python3 /tmp/adf-producer.py <Data Firehoseで設定したストリーム名>

これで、下図のようにJSON形式でデータがどんどん生成されていけばOKです。

Snowflakeで実際にテーブルをクエリ

それではSnowflakeにログインし、ちゃんとテーブルにストリーミングされているかを確認してみます。

まずはシンプルなSELECT文を実行してみると、ちゃんとデータがストリーミングされていることがわかります!

select * from adf_streaming_tbl;

次に、このテーブルからより実際に使いやすくするための変換を行うビューを定義します。タイムスタンプをさまざまなタイムゾーンに変換、SnowflakeのGeohash関数を使用してGrafanaなどの時系列視覚化ツールで使用できる Geohashを生成、st_distance関数を使って飛行機とサンフランシスコ空港の間の距離を計算、といったことを行っています。

create or replace view flights_vw
  as select
    utc::timestamp_ntz ts_utc,
    CONVERT_TIMEZONE('UTC','America/Los_Angeles',ts_utc::timestamp_ntz) as ts_pt,
    alt::integer alt,
    dest::string dest,
    orig::string orig,
    id::string id,
    icao::string icao,
    lat::float lat,
    lon::float lon,
    st_geohash(to_geography(st_makepoint(lon, lat)),12) geohash,
    st_distance(st_makepoint(-122.366340, 37.616245), st_makepoint(lon, lat))/1609::float dist_to_sfo,
    year(ts_pt) yr,
    month(ts_pt) mo,
    day(ts_pt) dd,
    hour(ts_pt) hr
FROM adf_streaming_tbl;

実際にビューに対してSELECT文を実行すると下図のようなデータが得られます。このようなデータがニアリアルタイムで得られるため、今現在の空港と飛行機の位置関係を分析したいときにも使えますね!

select * from flights_vw;

お片付け

Quickstartの内容としては以上となるので、これ以上課金がされないようにAWSのリソースを削除しておきましょう。

まずCloudFormationでのスタックの削除を行います。(もしS3バケットが削除できない場合には、バケットの中身を空にしてからスタックを削除してください。)

次に、手動で定義したData Firehoseのストリームの削除を行います。これでお片付けは完了です!

最後に

SnowflakeのQuickstart「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみたのでその内容をまとめてみました。

実はData Firehoseを触るのは初めてだったのですが、Data FirehoseとSnowflakeを使って簡単にデータストリーミングパイプラインが構築できることがわかりましたね!

今回はパブリックサブネット経由での実行だったこともあるので、本番運用で想定されるプライベートサブネット経由での実行の場合はもう少しネットワーク周りの設定が必要になると思いますが、本記事が参考になると嬉しいです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.